package com.shujia.trip;

import com.shujia.util.FileUtil;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * @author shujia
 */
public class FlinkSqlRun {
    public static void main(String[] args) {
        //环境配置对象
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //流处理模式
                .inStreamingMode()
                //批处理模式
                //.inBatchMode()
                .build();

        //创建flink sql的环境
        TableEnvironment table = TableEnvironment.create(settings);


        //获取sql文件参数
        if (args.length == 0) {
            System.out.println("请指定需要执行的sql文件");
            return;
        }
        String sqlPath = args[0];

        //读取sql文件中的sql
        String sqlStr = FileUtil.readFile(sqlPath);

        System.out.println("============================正在执行的sql===============================");
        System.out.println(sqlStr);
        System.out.println("======================================================================");

        //按照分号切分多个sql语句
        String[] sqlArray = sqlStr.split(";");

        //循环执行sql
        for (String sql : sqlArray) {
            if (!"".equals(sql.trim())) {
                //执行sql
                table.executeSql(sql);
            }
        }
    }
}
